/**
 * Copyright (c) 2023 murenchao
 * taomu is licensed under Mulan PubL v2.
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 * You may obtain a copy of Mulan PubL v2 at:
 *       http://license.coscl.org.cn/MulanPubL-2.0
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 * See the Mulan PubL v2 for more details.
 */
package cool.taomu.mqtt.broker.factory

import cool.taomu.mqtt.broker.entity.MessageEntity
import cool.taomu.mqtt.broker.impl.PublishObservable
import cool.taomu.mqtt.broker.impl.Retain
import cool.taomu.mqtt.broker.impl.RetainObservable
import cool.taomu.mqtt.broker.utils.MqttUtils
import cool.taomu.mqtt.broker.utils.impl.DataStorage
import cool.taomu.utils.inter.IObservable
import cool.taomu.utils.inter.IObserver
import cool.taomu.utils.inter.IStorage
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.mqtt.MqttFixedHeader
import io.netty.handler.codec.mqtt.MqttMessage
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader
import io.netty.handler.codec.mqtt.MqttMessageType
import io.netty.handler.codec.mqtt.MqttPubAckMessage
import io.netty.handler.codec.mqtt.MqttPublishMessage
import io.netty.handler.codec.mqtt.MqttQoS
import io.netty.util.ReferenceCountUtil
import org.slf4j.LoggerFactory

class PublishRequest implements IProcess {
	val static LOG = LoggerFactory.getLogger(PublishRequest);

	IStorage cache = new DataStorage();

	val static IObservable<IObserver> retainObservable = RetainObservable.instance;
	val static IObservable<IObserver> observable = PublishObservable.instance;

	override request(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
		try {
			if(!(mqttMessage instanceof MqttPublishMessage)) return;
			var publishMessage = mqttMessage as MqttPublishMessage;
			var message = new MessageEntity();
			message.senderId = MqttUtils.getClientId(ctx.channel)
			LOG.info("执行了MQTT Publish 命令 : " + message.senderId);
			var qos = publishMessage.fixedHeader.qosLevel;
			message.qos = qos.ordinal;
			message.topic = publishMessage.variableHeader.topicName;
			message.payload = (mqttMessage as MqttPublishMessage).payload;
			message.type = mqttMessage.fixedHeader.messageType.value;
			message.dup = publishMessage.fixedHeader.isDup;
			message.retain = publishMessage.fixedHeader.isRetain;
			message.msgId = publishMessage.variableHeader.packetId;
			message.senderChannel = ctx.channel;
			switch (qos) {
				case EXACTLY_ONCE,
				case AT_MOST_ONCE: {
					LOG.info(String.format("Qos0 and Qos2 message,clientId=%s", message.senderId));
					retainMessage(message);
					publishMessage(message);
				}
				case AT_LEAST_ONCE: {
					LOG.info(String.format("Qos1 message,clientId=%s", message.senderId));
					retainMessage(message);
					publishMessage(message);
					// 回给发送者
					var header = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
					var varHeader = MqttMessageIdVariableHeader.from(message.msgId);
					ctx.writeAndFlush(new MqttPubAckMessage(header, varHeader));
				}
				default: {
					LOG.info(String.format("Wrong mqtt message,clientId=%s", message.senderId));
				}
			}
		} catch (Exception ex) {
			LOG.debug("执行了MQTT Publish 命令出错了 : ", ex);
		} finally {
			ReferenceCountUtil.release(mqttMessage.payload());
		}
	}

	def retainMessage(MessageEntity message) {
		LOG.debug(
			"clientId 为 {} 是否存在 Retain 数据 {}, 接受到的数据为 {} ",
			message.senderId,
			message.retain,
			new String(message.payload)
		);
		// 存储消息
		cache.put("mqtt-message",message.senderId, message);
		// 需要功能验证 Retain 需要根据官方文档进行修改
		if (message.retain) {
			var qos = message.qos;
			var payload = message.payload;
			// msgVal.clientId 为发送者的clientId
			if (qos === MqttQoS.AT_MOST_ONCE.ordinal || payload === null || payload.length === 0) {
				LOG.info("清空 clientId 为 {} 的Retain数据", message.senderId);
				retainObservable.unregister(#[message.senderId, message.topic].join("-"));
			} else {
				LOG.info("保存 clientId 为 {} 的Retain数据", message.senderId);
				retainObservable.register(#[message.senderId, message.topic].join("-"), new Retain(message));
			}
		}
	}

	def publishMessage(MessageEntity message) {
		LOG.debug("推送消息");
		// 推送用户订阅
		observable.publish(message);
	}
}
